/** 
 *
 * Lisense:
 * Copyright (c) 2015.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * 
 * 
 * @fileDesc:TODO
 * @author:dairymix
 * @version:1.0.0
 */
package com.jfinal.plugin.zbus.kit;

import com.jfinal.plugin.zbus.core.Sender;
import com.jfinal.plugin.zbus.type.SendType;
import org.zbus.broker.Broker;
import org.zbus.broker.BrokerConfig;
import org.zbus.broker.SingleBroker;
import org.zbus.mq.Producer;

import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;

/** 
 * @ClassName:ZbusKit
 * @Description:用于提供zbus的常用工具方法
 * @author:dairymix
 * @version:1.0.0
 */
public class ZbusKit {

    public static Broker broker;

    public static Broker ensureBroker(String brokerAddress){
        if(broker == null){
            BrokerConfig brokerConfig = new BrokerConfig();
            brokerConfig.setBrokerAddress(brokerAddress);
            try {
                broker = new SingleBroker(brokerConfig);
            } catch (IOException e) {
                e.printStackTrace();
                return null;
            }
        }
        return broker;
    }

//    public static Broker ensureBroker(String brokerAddress){
//        BrokerConfig brokerConfig = new BrokerConfig();
//        brokerConfig.setBrokerAddress(brokerAddress);
//        Broker broker;
//        try {
//            broker = new SingleBroker(brokerConfig);
//        } catch (IOException e) {
//            e.printStackTrace();
//            return null;
//        }
//        return broker;
//    }

    public static <T> void sendSync(String brokerAddress, Sender<T> sender,
                                    Integer producerNum, Integer senderNum, T object) throws IOException, InterruptedException{
        sender.sendSync(ensureBroker(brokerAddress), producerNum, senderNum, object);
        //mqSender.sendSync(broker,msgLog);//同步发送对象到MQ
        //mqMsgSender.sendSync(broker, String.valueOf(msgLog.get("msg_body")));
    }

    //sendAsync
    public static <T> void sendAsync(String brokerAddress, Sender<T> sender,
                                     Integer producerNum, Integer senderNum ,T object) throws IOException, InterruptedException{
        sender.sendAsync(ensureBroker(brokerAddress), producerNum,senderNum, object);
        //mqSender.sendAsync(broker,msgLog);//异步发送对象到MQ
        //mqMsgSender.sendAsync(broker, String.valueOf(msgLog.get("msg_body")));
    }

    public static <T> void send(SendType sendType, String brokerAddress, Sender<T> sender,
                                Integer producerNum, Integer senderNum, T object) {
        try {
            switch (sendType) {
                case sync:sendSync(brokerAddress,sender,producerNum,senderNum,object);break;
                case async:sendAsync(brokerAddress,sender,producerNum,senderNum,object);break;
                default:sendSync(brokerAddress,sender,producerNum,senderNum,object);break;
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            System.out.println(object.getClass().getSimpleName() + " send err..");
        }
    }

    public static boolean rmvMQ(String brokerAddress,String mq) throws IOException, InterruptedException {
        Producer producer = new Producer(ensureBroker(brokerAddress), mq);
        return producer.removeMQ();
    }

    @SuppressWarnings("rawtypes")
    public static Class getSuperClassGenricType(Class<?> clazz) {
        Type genType = clazz.getGenericSuperclass();
        if (!(genType instanceof ParameterizedType)) {
            throw new RuntimeException(clazz.getSimpleName() + "'s superclass not ParameterizedType");
        }
        Type[] params = ((ParameterizedType) genType).getActualTypeArguments();

        if (!(params[0] instanceof Class)) {
            throw new RuntimeException(
                    clazz.getSimpleName() + " not set the actual class on superclass generic parameter");
        }
        return (Class<?>) params[0];
    }

}
